package com.lm.camsFlink;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE t_cr_menu (\n" +
                "   id STRING,\n" +
                "   name STRING,\n" +
                "   PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'localhost',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'root',\n" +
                "   'password' = 'Lm123!',\n" +
                "   'database-name' = 'scp_core',\n" +
                "  'table-name' = 't_cr_menu'\n" +
                " )");
        tableEnv.executeSql("CREATE TABLE t_department_ch (\n" +
                "   id STRING,\n" +
                "   name STRING,\n" +
                "   PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'clickhouse',\n" +
                "   'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
                "   'userName' = 'default',\n" +
                "   'password' = 'Lm123!',\n" +
                "  'tableName' = 't_department_ch'" +
                " )");
        tableEnv.executeSql("insert into t_department_ch select * from t_cr_menu");

    }
}
